package backend.node;

import backend.core.component.Lifecycle;
import backend.env.Environment;
import backend.env.NodeEnvironment;
import backend.env.NodeMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class Node {
    private final Lifecycle lifecycle = new Lifecycle();

    private final Logger logger = LoggerFactory.getLogger(Node.class);

    private final Environment environment;
    private final NodeEnvironment nodeEnvironment;

    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        if (lifecycle.moveToStarted() == false) {
            return this;
        }

        logger.info("starting ...");

        if (ReadinessService.enabled(environment)) {
            injector.getInstance(ReadinessService.class).start();
        }
        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RepositoriesService.class).start();
        injector.getInstance(SearchService.class).start();
        injector.getInstance(FsHealthService.class).start();
        nodeService.getMonitorService().start();

        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
        nodeConnectionsService.start();
        clusterService.setNodeConnectionsService(nodeConnectionsService);

        injector.getInstance(GatewayService.class).start();
        final Coordinator coordinator = injector.getInstance(Coordinator.class);
        clusterService.getMasterService().setClusterStatePublisher(coordinator);

        // Start the transport service now so the publish address will be added to the local disco node in ClusterService
        TransportService transportService = injector.getInstance(TransportService.class);
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.getTaskManager().setTaskCancellationService(new TaskCancellationService(transportService));
        transportService.start();
        assert localNodeFactory.getNode() != null;
        assert transportService.getLocalNode().equals(localNodeFactory.getNode())
                : "transportService has a different local node than the factory provided";
        injector.getInstance(PeerRecoverySourceService.class).start();

        // Load (and maybe upgrade) the metadata stored on disk
        final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
        gatewayMetaState.start(
                settings(),
                transportService,
                clusterService,
                injector.getInstance(MetaStateService.class),
                injector.getInstance(IndexMetadataVerifier.class),
                injector.getInstance(MetadataUpgrader.class),
                injector.getInstance(PersistedClusterStateService.class),
                pluginsService.filterPlugins(ClusterCoordinationPlugin.class).toList(),
                injector.getInstance(CompatibilityVersions.class)
        );
        // TODO: Do not expect that the legacy metadata file is always present https://github.com/elastic/elasticsearch/issues/95211
        if (Assertions.ENABLED && DiscoveryNode.isStateless(settings()) == false) {
            try {
                assert injector.getInstance(MetaStateService.class).loadFullState().v1().isEmpty();
                final NodeMetadata nodeMetadata = NodeMetadata.FORMAT.loadLatestState(
                        logger,
                        NamedXContentRegistry.EMPTY,
                        nodeEnvironment.nodeDataPaths()
                );
                assert nodeMetadata != null;
                assert nodeMetadata.nodeVersion().equals(BuildVersion.current());
                assert nodeMetadata.nodeId().equals(localNodeFactory.getNode().getId());
            } catch (IOException e) {
                assert false : e;
            }
        }
        // we load the global state here (the persistent part of the cluster state stored on disk) to
        // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
        final Metadata onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metadata();
        assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
        validateNodeBeforeAcceptingRequests(
                new BootstrapContext(environment, onDiskMetadata),
                transportService.boundAddress(),
                pluginsService.flatMap(Plugin::getBootstrapChecks).toList()
        );

        final FileSettingsService fileSettingsService = injector.getInstance(FileSettingsService.class);
        fileSettingsService.start();
        // if we are using the readiness service, listen for the file settings being applied
        if (ReadinessService.enabled(environment)) {
            fileSettingsService.addFileChangedListener(injector.getInstance(ReadinessService.class));
        }

        clusterService.addStateApplier(transportService.getTaskManager());
        // start after transport service so the local disco is known
        coordinator.start(); // start before cluster service so that it can set initial state on ClusterApplierService
        clusterService.start();
        assert clusterService.localNode().equals(localNodeFactory.getNode())
                : "clusterService has a different local node than the factory provided";
        transportService.acceptIncomingRequests();
        /*
         * CoordinationDiagnosticsService expects to be able to send transport requests and use the cluster state, so it is important to
         * start it here after the clusterService and transportService have been started.
         */
        injector.getInstance(CoordinationDiagnosticsService.class).start();
        coordinator.startInitialJoin();
        final TimeValue initialStateTimeout = INITIAL_STATE_TIMEOUT_SETTING.get(settings());
        configureNodeAndClusterIdStateListener(clusterService);

        if (initialStateTimeout.millis() > 0) {
            final ThreadPool thread = injector.getInstance(ThreadPool.class);
            ClusterState clusterState = clusterService.state();
            ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());

            if (clusterState.nodes().getMasterNodeId() == null) {
                logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
                final CountDownLatch latch = new CountDownLatch(1);
                observer.waitForNextChange(new ClusterStateObserver.Listener() {
                    @Override
                    public void onNewClusterState(ClusterState state) {
                        latch.countDown();
                    }

                    @Override
                    public void onClusterServiceClose() {
                        latch.countDown();
                    }

                    @Override
                    public void onTimeout(TimeValue timeout) {
                        logger.warn("timed out while waiting for initial discovery state - timeout: {}", initialStateTimeout);
                        latch.countDown();
                    }
                }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);

                try {
                    latch.await();
                } catch (InterruptedException e) {
                    throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
                }
            }
        }

        injector.getInstance(HttpServerTransport.class).start();

        if (WRITE_PORTS_FILE_SETTING.get(settings())) {
            TransportService transport = injector.getInstance(TransportService.class);
            writePortsFile("transport", transport.boundAddress());
            HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
            writePortsFile("http", http.boundAddress());

            if (ReadinessService.enabled(environment)) {
                ReadinessService readiness = injector.getInstance(ReadinessService.class);
                readiness.addBoundAddressListener(address -> writePortsFile("readiness", address));
            }

            if (RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.get(environment.settings())) {
                writePortsFile("remote_cluster", transport.boundRemoteAccessAddress());
            }
        }

        injector.getInstance(NodeMetrics.class).start();
        injector.getInstance(HealthPeriodicLogger.class).start();

        logger.info("started {}", transportService.getLocalNode());

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }


}
